#!/bin/bash

function task_job() {
jdbcUrl="$1"
db="$2"
mysqlTable="$3"
hiveTable="$4"
column="${5}"
sub="${6}"
threadNumber="${7}"
typed="${8}"
dt="{{ execution_date | cst_ds }}"
nextdt="{{ execution_date | date_add(1) | cst_ds }}"
# dt="{{ execution_date | date_add(-5) | cst_ds }}"
# nextdt="{{ execution_date | date_add(-4) | cst_ds }}"
env="{{ var.value.env_sync }}"
username='{{ var.json.mysql_spmi.username }}'
password='{{ var.json.mysql_spmi.password }}'

json='{
"reader":{
"connect":{
"url":"'"${jdbcUrl}"'",
"username":"'"${username}"'",
"password":"'"${password}"'",
"driver":"com.mysql.cj.jdbc.Driver"
},
"dbtype":"mysql",
"tableName":"'"${db}.${mysqlTable}"'",
"subTableList":"'"${sub}"'",
"where":"'"${column} between str_to_date('${dt} 00:00:00', '%Y-%m-%d %H:%i:%s') and str_to_date('${dt} 23:59:59', '%Y-%m-%d %H:%i:%s')"'",
"query":"'""'",
"splitColumn":"'"${col}"'",
"equalitySectioning":0,
"containsnull":0,
"fetchsize":"1024",
"threadNumber":0
},
"channel":{
"filterAbnormalCharacter":0
},
"writer":{
"dag":"spmi_ods",
"task_id":"spmi_ods__spmi_operation_change_fee_bill",
"dbtype":"hive",
"tableName":"'"${hiveTable}"'",
"database":"spmi_ods",
"writeMode": "'"${typed}"'",
"partitionColumn":"dt",
"partitionValue":"'"${dt}"'"},
"settting":{
"env":"'"${env}"'"}
}'

/usr/hdp/3.1.4.0-315/spark2/bin/spark-submit \
--master yarn \
--deploy-mode cluster \
--driver-memory 18G \
--driver-cores 4 \
--executor-memory 8G \
--executor-cores 2 --queue pro  \
--num-executors ${threadNumber}  \
--conf spark.driver.maxResultSize=6g  \
--class com.yunlu.bigdata.jobs.synchrotool.DataSynchDriver \
hdfs:///scheduler/jms/spark/sync/mysql/spark_sync.jar  "$json"

echo ${json} >> /jms_ods/spmi_operation_change_fee_bill.log
echo "任务执行成功: ${dt} ${hiveTable} ${db} ${sub}"
}


jdbcUrl1='{{ var.json.mysql_spmi.spmibill07_yl_spmibill_opt_7_1 }}'
jdbcUrl2='{{ var.json.mysql_spmi.spmibill07_1_yl_spmibill_opt_7_2 }}'
jdbcUrl3='{{ var.json.mysql_spmi.spmibill3_yl_spmibill_opt_7_3 }}'
jdbcUrl4='{{ var.json.mysql_spmi.spmibill4_yl_spmibill_opt_7_4 }}'
jdbcUrl5='{{ var.json.mysql_spmi.spmibill07_yl_spmibill_opt_117_1 }}'
jdbcUrl6='{{ var.json.mysql_spmi.spmibill07_1_yl_spmibill_opt_117_2 }}'
jdbcUrl7='{{ var.json.mysql_spmi.spmibill3_yl_spmibill_opt_117_3 }}'
jdbcUrl8='{{ var.json.mysql_spmi.spmibill4_yl_spmibill_opt_117_4 }}'
jdbcUrl9='{{ var.json.mysql_spmi.spmibill07_yl_spmibill_opt_127 }}'

mTab="spmi_operation_change_fee_bill"
hTab="spmi_operation_change_fee_bill"
col='create_time'



task_job  ${jdbcUrl1}  yl_spmibill_opt_7_1      ${mTab} ${hTab} ${col} "[0,255]" 8  overwrite
task_job  ${jdbcUrl2}  yl_spmibill_opt_7_2      ${mTab} ${hTab} ${col} "[0,255]" 3  append    &
task_job  ${jdbcUrl3}  yl_spmibill_opt_7_3      ${mTab} ${hTab} ${col} "[0,255]" 3  append    &
task_job  ${jdbcUrl4}  yl_spmibill_opt_7_4      ${mTab} ${hTab} ${col} "[0,255]" 3  append    &
task_job  ${jdbcUrl5}  yl_spmibill_opt_117_1      ${mTab} ${hTab} ${col} "[0,255]" 3  append    &
task_job  ${jdbcUrl6}  yl_spmibill_opt_117_2      ${mTab} ${hTab} ${col} "[0,255]" 3  append    &
task_job  ${jdbcUrl7}  yl_spmibill_opt_117_3      ${mTab} ${hTab} ${col} "[0,255]" 3  append    &
task_job  ${jdbcUrl8}  yl_spmibill_opt_117_4      ${mTab} ${hTab} ${col} "[0,255]" 3  append    &
task_job  ${jdbcUrl9}  yl_spmibill_opt_127      ${mTab} ${hTab} ${col} "[0,255]" 3  append    &

wait
echo "${hTab} 表所有任务执行成功! "
